package com.atguigu.flink.window;

import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * Created by Smexy on 2023/2/27
 *
 *      滚动(增量)聚合： 窗口中每进入一个元素，就触发一次聚合。
 *                      sum,max,min,minBy,maxBy
 */
public class Demo5_Agg
{
    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //求每种传感器的水位和
                env
                   .socketTextStream("hadoop103", 8888)
                   .map(new WaterSensorMapFunction())
                   .keyBy(WaterSensor::getId)
                   .countWindow(3)
                   //等窗口关闭后，才会把数据发往下游的print
                   .sum("vc")
                   .print();


                try {
                            env.execute();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }


    }
}
